/**
 * @file 
 *
 * Distributed client benchmark
 */

#include <filesystem>
#include <vector>
#include <unordered_map>
#include <map>
#include <unordered_set>
#include <random>
#include <fstream>
#include <sstream>
#include <memory>
#include <thread>
#include <atomic>
#include <chrono>
using namespace std::chrono_literals;
#include <numeric>

#include "common/boost_log_helper.hpp"
#include <boost/program_options.hpp>

#include "defaults.hpp"
#include "ycsb_parser.hpp"
#include "client.hpp"
#include "perf/client_perf.hpp"
#include "../tweaks.hpp"

using namespace std;


int main(const int argc, const char **argv)
{
    const filesystem::path src_dir = 
        filesystem::absolute(argv[0]).parent_path().parent_path().parent_path();
    filesystem::path config_path;
    filesystem::path ycsb_run_path;
    string log_level;
    unsigned client_id;
    unsigned thread_nr_to_test;
    filesystem::path lock_file_path;
    filesystem::path record_latency_basedir;    // if empty don't record
    {
        namespace po = boost::program_options;
        po::options_description desc;
        desc.add_options()
            ("config,C", po::value(&config_path),
                "Configuration file, if not given, will search for "
                "/etc/gestalt/gestalt.conf, ./gestalt.conf, "
                "./etc/gestalt/gestalt.conf, whichever comes first.")
            ("log,L", po::value(&log_level)->default_value("info"),
                "Logging level (Boost).")
            ("id,i", po::value(&client_id)->required(), "specify client ID")
            ("thread-nr,t", po::value(&thread_nr_to_test)->default_value(16),
                "workload thread count on this client node")
            ("record-latency,R", po::value(&record_latency_basedir),
                "output directory of recorded per-operation latency, will not "
                "record if not given")
            // ("ycsb-load", po::value(&ycsb_load_path), "YCSB load output")
            ("ycsb-run", po::value(&ycsb_run_path)
                    ->default_value(src_dir/"build"/"runtime"/"run.ycsb"),
                "YCSB run output")
            ("lock", po::value(&lock_file_path)
                    ->default_value(src_dir/"build"/"runtime"/"follower-in-progress.lock"),
                "Lock file path")
            ;
        po::variables_map vm;
        po::store(po::parse_command_line(argc, argv, desc), vm);
        po::notify(vm);
    }

    set_boost_log_level(log_level);

    if (config_path.empty()) {
        for (auto &p : gestalt::defaults::config_paths) {
            if (!filesystem::is_regular_file(p))
                continue;
            config_path = p;
            break;
        }
    }
    if (!filesystem::is_regular_file(config_path)) {
        BOOST_LOG_TRIVIAL(fatal) << "Cannot find configuration file";
        exit(EXIT_FAILURE);
    }

    const bool record_latency = !record_latency_basedir.empty();
    if (record_latency) {
        if (filesystem::exists(record_latency_basedir)
                && !filesystem::is_directory(record_latency_basedir)) {
            ostringstream what;
            what << record_latency_basedir << " exists but is not a directory";
            throw std::runtime_error(what.str());
        }
        /* clean-up dump directory, directory sturcture to be generated with
            fs::create_directories() */
        filesystem::remove_all(record_latency_basedir);
        filesystem::create_directories(record_latency_basedir);
    }

    /* create lock file if not exist, marks the follower is not yet ready */
    {
        ofstream lockf(lock_file_path);
    }

    /* load run trace */

    smdsbz::ycsb_parser::trace ycsb_run;
    {
        namespace yp = smdsbz::ycsb_parser;
        ycsb_run.reserve(1e5);
        yp::parse(ycsb_run_path, ycsb_run);
    }
    BOOST_LOG_TRIVIAL(info) << "YCSB workload loaded";

    /* generate scrambled YCSB run trace for each thread, minimizing CPU cache
        miss impact introduced when copying user blob data into I/O request body */
    BOOST_LOG_TRIVIAL(info) << "Generating trace for each thread (total "
        << thread_nr_to_test << " threads for this client instance) ...";
    vector<decltype(ycsb_run)> thread_run(thread_nr_to_test);
    {
        std::random_device rd;
        std::default_random_engine re(rd());
        std::uniform_int_distribution<unsigned> dist(0, ycsb_run.size() - 1);
        for (auto &tt : thread_run) {
            tt.reserve(1e5);
            for (unsigned i = 0; i < ycsb_run.size(); i++)
                tt.push_back(ycsb_run.at(dist(re)));
        }
    }
    BOOST_LOG_TRIVIAL(info) << "Thread-specific trace generated";

    /* delete the lock file to mark follower ready, starter may be invoked */
    if (auto r = filesystem::remove(lock_file_path); !r) {
        BOOST_LOG_TRIVIAL(fatal) << "lock file " << lock_file_path << " not removed by me";
        throw std::runtime_error("lock file not removed by me");
    }

    /* run workload */

    volatile bool start_flag = false, stop_flag = false;
    vector<unique_ptr<gestalt::perf::ClientLatencyRecorder>> thread_perf_lat;
    const auto reset_perf = [&] ()
    {
        thread_perf_lat.resize(thread_run.size());
        for (auto &pr : thread_perf_lat)
            pr.reset(new gestalt::perf::ClientLatencyRecorder);
    };
    const auto start_perf_op = [&] (
        gestalt::perf::ClientLatencyRecorder &lat_rec,
        const std::remove_cvref_t<decltype(ycsb_run.front())> &op)
    {
        using Op = decltype(op.op);
        switch (op.op) {
        case Op::READ: {
            lat_rec.read_tick();
            break;
        }
        case Op::INSERT: case Op::UPDATE: {
            lat_rec.write_tick();
            break;
        }
        default:
            throw std::runtime_error("unknown op type");
        }
    };
    const auto end_perf_op = [&] (
        gestalt::perf::ClientLatencyRecorder &lat_rec,
        const std::remove_cvref_t<decltype(ycsb_run.front())> &op)
    {
        using Op = decltype(op.op);
        switch (op.op) {
        case Op::READ: {
            lat_rec.read_tock();
            break;
        }
        case Op::INSERT: case Op::UPDATE: {
            lat_rec.write_tock();
            break;
        }
        default:
            throw std::runtime_error("unknown op type");
        }
    };


    vector<unsigned long long> thread_completed_ops(thread_nr_to_test, 0);
    const auto thread_test_fn = [&] (const unsigned thread_id) {
        auto &completed_ops = thread_completed_ops.at(thread_id);
        const auto perf_lat_rec = record_latency ? thread_perf_lat.at(thread_id).get() : nullptr;
        gestalt::Client client(config_path, client_id * 1000 + thread_id);

        /* requests to invalid entries will not travel the same I/O path, thus
            polluting benchmark result, we should skip them */
        unordered_set<std::string> invalid_entries;

        while (!start_flag)
            [[unlikely]] ;

        const auto &thread_run_workload = thread_run[thread_id];
        const auto thread_run_size = thread_run_workload.size();
        for (size_t i = 0; !stop_flag; i = (i + 1) % thread_run_size) {
            const auto &d = thread_run_workload[i];
            using Op = decltype(d.op);
            /* skip invalid entry */
            [[likely]] if constexpr (gestalt_bench::only_valid_data) {
                if (invalid_entries.count(d.okey))
                    [[unlikely]] continue;
            }
            if (perf_lat_rec)
                start_perf_op(*perf_lat_rec, d);
            int r = 0;
            bool retry;
            do {
                retry = false;
                switch (d.op) {
                case Op::READ: {
                    if (r = client.get(d.okey.c_str()); r) {
                        /* key temporarily locked */
                        [[unlikely]] if (r == -ECOMM) {
                            [[likely]] retry = true;
                            // total_retries++;
                            break;
                        }
                        if (r == -EAGAIN) {
                            [[likely]] if constexpr (!gestalt::optimization::allow_read_transient) {
                                retry = true;
                                // total_retries++;
                            }
                            break;
                        }
                        /* key not inserted */
                        if (r == -EINVAL)
                            [[likely]] break;
                        BOOST_LOG_TRIVIAL(error) << "failed to read " << d.okey
                            << " : " << std::strerror(-r);
                        errno = -r;
                        boost_log_errno_throw(Client::get);
                    }
                    break;
                }
                case Op::UPDATE: {
                    if constexpr (!gestalt::optimization::use_gestalt_bufferlist) {
                        uint8_t buf[4_K];
                        std::strcpy(reinterpret_cast<char*>(buf), d.okey.c_str());
                        r = client.put(d.okey.c_str(), buf, sizeof(buf));
                    }
                    else {
                        auto &bl = client.write_op->buf;
                        auto &slot = bl.data()[0];
                        slot.meta.set_key(d.okey);
                        std::strcpy(reinterpret_cast<char*>(slot.data.get()), d.okey.c_str());
                        bl.reformat(4_K);
                        r = client.put();
                    }
                    if (r) {
                        /* key temporarily locked */
                        [[unlikely]] if (r == -EBUSY) {
                            [[unlikely]] retry = true;
                            // total_retries++;
                            break;
                        }
                        /* key not inserted */
                        [[unlikely]] if (r == -EDQUOT)
                            [[likely]] break;
                        BOOST_LOG_TRIVIAL(error) << "failed to update " << d.okey
                            << " : " << std::strerror(-r);
                        errno = -r;
                        boost_log_errno_throw(Client::put);
                    }
                    break;
                }
                default:
                    throw std::runtime_error("unexpected run op");
                }
            } while (retry);
            /* record invalid entry, and exclude them from metrics */
            if constexpr (gestalt_bench::only_valid_data) {
                if (r) {
                    [[unlikely]] invalid_entries.insert(d.okey);
                    continue;
                }
            }
            completed_ops++;
            if (perf_lat_rec)
                end_perf_op(*perf_lat_rec, d);
        }

        if (!stop_flag)
            throw std::runtime_error("trace ended prematurely");
    };

    if (record_latency)
        reset_perf();

    vector<std::jthread> test_thread_pool;
    for (unsigned i = 0; i < thread_nr_to_test; i++)
        test_thread_pool.push_back(std::jthread(thread_test_fn, i));
    BOOST_LOG_TRIVIAL(info) << "started workload threads";

    const auto us_since_epoch = [] () -> int64_t
    {
        return std::chrono::duration_cast<std::chrono::microseconds>(
                std::chrono::system_clock::now().time_since_epoch()
            ).count();
    };
    BOOST_LOG_TRIVIAL(debug) << "now is " << us_since_epoch();

    /* acquire sentinel from kv, retry if effective value not published by
        starter yet */
    /**
     * @note Gestalt client cannot successfully disconnect RDMA within timeout
     * in the dev env, blocking subsequent benchmark procedure. Not sure if it
     * is a soft RoCE issue, so just don't disconnect until the entire benchmark
     * is finished.
     */
    gestalt::Client coord_client(config_path, client_id + 1919880);
    int64_t start_ts = 0, end_ts = 0;
    {
        BOOST_LOG_TRIVIAL(debug) << "now is " << us_since_epoch();
        while (!start_ts || !end_ts) {
            std::this_thread::sleep_for(.5s);
            if (int r = coord_client.get("start_at"); r)
                continue;
            start_ts = *reinterpret_cast<int64_t*>(coord_client.read_op->buf.data());
            if (int r = coord_client.get("end_at"); r)
                continue;
            end_ts = *reinterpret_cast<int64_t*>(coord_client.read_op->buf.data());
        }
    }
    BOOST_LOG_TRIVIAL(info) << "received valid start_ts " << start_ts
        << "end_ts" << end_ts;

    /* create lock file to mark follower is busy running workload */
    {
        ofstream lockf(lock_file_path);
    }

    BOOST_LOG_TRIVIAL(debug) << "waiting for start, now is " << us_since_epoch();

    /* follower should at least wait for one spin to make sure workload will
        start immediately after passing start_ts */
    {
        bool has_waited = false;
        while (us_since_epoch() < start_ts)
            [[unlikely]] has_waited = true;
        start_flag = true;
        if (!has_waited) {
            BOOST_LOG_TRIVIAL(fatal) << "thread initialization took too long, "
                << "expected to finish before " << start_ts << ", now is "
                << us_since_epoch();
            stop_flag = true;
            filesystem::remove(lock_file_path);
            throw std::runtime_error("thread initialization took too long");
        }
    }
    BOOST_LOG_TRIVIAL(info) << "Test started";

    /* on reaching end_ts, tell all worker threads to stop */
    {
        bool has_waited = false;
        while (us_since_epoch() < end_ts)
            [[unlikely]] has_waited = true;
        if (!has_waited) {
            BOOST_LOG_TRIVIAL(warning) << "Test seems to have run slightly "
                << "longer than expected " << end_ts
                << ", now is " << us_since_epoch() << ". "
                << "This is somewhat expected in the dev env.";
        }
    }
    stop_flag = true;
    BOOST_LOG_TRIVIAL(info) << "Test should now be terminated";
    // test_thread_pool.clear();

    if (record_latency) {
        for (unsigned i = 0; i < thread_run.size(); i++) {
            ostringstream pfx;
            pfx << "node" << client_id << "_thrd" << i << "_";
            thread_perf_lat.at(i)->dump(
                record_latency_basedir / (pfx.str() + "read.csv"),
                record_latency_basedir / (pfx.str() + "write.csv"));
        }
        BOOST_LOG_TRIVIAL(info) << "Dumped recorded latency to " << record_latency_basedir;
    }

    BOOST_LOG_TRIVIAL(info) << "total_completed_ops "
        << std::accumulate(thread_completed_ops.begin(), thread_completed_ops.end(), 0ull)
        << std::flush;

    /* delete the lock file */
    if (auto r = filesystem::remove(lock_file_path); !r) {
        BOOST_LOG_TRIVIAL(fatal) << "lock file " << lock_file_path << " not removed by me";
        throw std::runtime_error("lock file not removed by me");
    }

    return EXIT_SUCCESS;
}
